В современных реалиях все чаще встает вопрос про переход с вендорских продуктах на open-source. Но в вендорских продуктах есть инструменты моделирования объектов и etl, которые при переходе на open-source теряются. Приходится выдумывать свое решение или искать решения внутри того же open-source, который не всегда адаптирован под конкретные СУБД. Две часто используемые СУБД: Greenplum - мощная MPP (Massively Parallel Processing) СУБД, которая не уступает по производительности зарубежным аналогам и поддерживается российскими вендорами, например, Arenadata DB (ADB) и Clickhouse - колоночная аналитическая СУБД с открытым кодом, позволяющая выполнять аналитические запросы в режиме реального времени на структурированных больших данных.
Но как быстро и эффективно начать миграцию на Greenplum или ClickHouse, сохранив привычные инструменты и подходы? В этой статье я расскажу о DBT Proplum - адаптере для DBT (Data Build Tool), который расширяет возможности работы с Greenplum и ClickHouse, добавляя новые стратегии загрузки данных, логирование и интеграцию с внешними источниками.
DBT (Data Build Tool) - это популярный инструмент для трансформации данных в современных хранилищах. Он позволяет:
DBT не занимается загрузкой сырых данных (это задача ETL-инструментов), а фокусируется на преобразовании уже загруженной информации в аналитические модели.
Но сейчас существуют проблемы с текущим адаптером для Greenplum в DBT. Официальный адаптер Greenplum для DBT не обновлялся более 2 лет и из-за этого не поддерживает актуальные версии DBT Core. Также адаптер никак не использует функциональность партиций для загрузок инкремента. Мы взяли данный адаптер как основу и добавили свою новую функциональность DBT Proplum к нему.
DBT Proplum - это адаптер для DBT Core, который добавляет специализированные материализации и стратегии загрузки данных, оптимизированные для Greenplum и ClickHouse. Он позволяет:
Ключевые особенности:
| Функциональность | Текущая DBT функциональность | Функциональность Proplum |
|---|---|---|
| Логирование | ❌ Никакого логирования внутри БД | ✔️ Логирование процессов работы DBT внутри СУБД |
| Определение периода дельты загрузки | Требуется ручное определение пользователем | ✔️ Автоматическое определение |
| Поддержка партиций | Только при создании таблиц | ✔️ Дополнительная функциональность: |
| - Использование подмены партиций при загрузке дельты | ||
| - Автоматическое создание дополнительных партиций | ||
| Бекап таблиц БД | ❌ Бекап не сохраняется в БД | ✔️ Автоматический бекап таблиц для случая фулл загрузок |
| Новые методы инкрементальных загрузок | ❌ Минимальный набор методов | ✔️ Добавленные новые методы с функциональностью: |
| - Загрузка дельты с подменой партиций | ||
| - Партициниорование внешних таблиц в периоде загрузки дельты | ||
| Интеграция внешних таблиц | ❌ Не используется | ✔️ Поддерживает новый метод материализации моделей external_table |
| Обработка качества данных | Стандартный набор | ✔️ Добавлены проверки: |
| - Проверка дубликат в ключах | ||
| - Отслеживания количества пришедших данных |
При работе загрузок внутри DBT Proplum каждый прогон модели выполняет сохранение логов загрузки в базу данных. Пример логов:
Главные поля в таблице логов:
Одной из ключевых возможностей DBT Proplum является работа с внешними таблицами (external tables), что особенно полезно при организации загрузок из внешних систем.
Пример модели внешней таблицы:
{{ config(
materialized='external_table',
connect_string="pxf://path/to/data",
columns="""
id int8,
name text,
created_at timestamp
"""
) }}
При создании внешних таблиц также поддерживается параметры, которые работая вместе с настройкой дельты целевой таблицы, партицируют данные внешней таблицы в периоде дельты загрузки для лучшей производительности загрузки.
В оригинальном адаптере для Greenplum существовали 3 алгоритма для загрузки инкремента: append, delete+insert, truncate+insert. DBT Proplum добавляет дополнительно 5 новых режима загрузки данных:
| Функциональность | FULL | DELTA UPSERT | DELTA MERGE | DELTA | PARTITIONS |
|---|---|---|---|---|---|
| Короткое описание алгоритма | Удаляются данные из таблицы, вставляются новые данные. Аналогичен truncate+insert методу с добавленным логированием загрузок. | Из целевой таблицы удаляются, которые пересекаются по ключу с пришедшими новыми данными. Вставляются новые данные. Аналогичен delete+insert методу с добавленным логированием загрузок. | Используется в таблицах, где есть партиция по умолчанию. Новые и уже существующие данные в таблице комбинируются в буферную таблицу, буферная таблица затем заменяет партицию по умолчанию | Вставка пришедших новых данных в дельте с автоматическом расширением интервала партицирования. | Аналогично алгоритму DELTA создаются новые партиции. Новые данные в зависимости от настройки модели либо комбинируются с уже существующими данными или берутся как есть. Выполняется замена партиций для каждой партиций, которая затронута новыми данными. |
| Подходит для | Малых таблиц справочников | Таблиц измерений, в которых могут происходить небольшие изменения данных | Больших таблиц измерений | Получение срезов данных или для таблиц с кумулятивной дельтой | Для партицированных таблиц фактов |
Рассмотрим практический пример загрузки данных из внешнего источника в партиционированную таблицу Greenplum с использованием стратегии PARTITIONS.
-- models/staging/ext_sales.sql
{{
config(
materialized='external_table',
connect_string="pxf://data/sales?PROFILE=s3:parquet",
load_method='pxf', -- Использует PXF для чтения данных из S3
model_target='sales_fact',
delta_field='sale_date', -- Поле для дельта-загрузки
safety_period='2 days', -- Учет опоздавших данных
columns="""
order_id bigint,
sale_date timestamp,
amount numeric(18,2),
"""
)
}}
-- models/dwh/sales_fact.sql
{% set partition_def %}
PARTITION BY RANGE (sale_date)
(
START ('2023-01-01'::timestamp) END ('2023-02-01'::timestamp)
EVERY (INTERVAL '1 month'),
DEFAULT PARTITION extra
)
{% endset %}
{{
config(
materialized='proplum',
incremental_strategy='partitions',
delta_field='sale_date',
merge_partitions=true, --флаг, что мы объединяем новые данные из дельты с уже существующими данными в целевой таблице (при false данные из целевой таблицы удаляются если они не пришли в дельте)
merge_keys=['order_id'], --ключ таблицы
raw_partition=partition_def,
fields_string="""
order_id bigint,
sale_date timestamp,
amount numeric(18,2),
"""
)
}}
SELECT * FROM {{ ref('ext_sales') }}
Как это работает:
Внешняя таблица (ext_sales):
Целевая таблица (sales_fact):
Создается с помесячным партицированием
При дельта-загрузке:
Общая схема процесса загрузки данных:
--- title: Процесс загрузки Sales_fact --- flowchart LR A[Начало] --> AA[Создаем внешнею таблицу ext_sales] AA --> B[Создание дельты таблицы] B --> C[Выполняется селект из модели sales_fact в таблицу дельту] C --> F[Создаются недостающие партиции в таблице sales_fact в дельте периоде] F --> G[Выполняем цикл по партициям внутри таблицы sales_fact в дельте периоде] G --> H[Создаем буферную таблицу] H --> I[Вставляем данные из таблицы дельты и sales_fact за период партиции в буферную таблицу] I --> K[ПОдменяем партицию в таблице sales_fact на буферную таблицу] K --> L[Конец цикла] L --> G L --> M[Выполняем Analyze sales_fact таблицы] M --> N[Записываем результат в таблицу логов] N --> O[Конец]
Преимущества подхода:
В стандартном адаптере Clickhouse для DBT реализованы 4 алгоритма для загрузки инкремента: append, delete+insert, insert_overwrite, microbatch. DBT Proplum добавляет дополнительно 3 новых режима загрузки данных:
| Функциональность | FULL | DELTA UPSERT | PARTITIONS |
|---|---|---|---|
| Короткое описание алгоритма | Обмен таблицы модели с таблицей с новыми данными. | Удаляются данные, которые пересекаются по ключу с пришедшими новыми данными. Вставляются новые данные. | Новые данные в зависимости от настройки модели либо комбинируются с уже существующими данными или берутся как есть. Выполняется подмена партиций для каждой партиций, которая затронута новыми данными. |
| Подходит для | Малых таблиц справочников | Таблиц измерений, в которых могут происходить обновление данных | Для партицированных таблиц фактов |
Одним из ключевых преимуществ DBT Proplum является его бесшовная интеграция с оркестраторами, такими как Apache Airflow.
Для работы dbt с airflow мы используем библиотеку airflow_dbt. Эта библиотека позволяет выполнять операции dbt как шаги дага. Например, мы можем с выполнять всем модели одного тага на схеме:
В один шаг (greenplum_tpch) дага:
А последующий шаг (clickhouse_tpch) позволяет сразу же после выполнение трансформаций в Greenplum обновить данные в Clickhouse.
Преимущества такого подхода:
Гибкость: Выполнение моделей можно ставить на расписание. Можно запускать как отдельные модели (--select model_name), так и целые слои (--select tag:ods). При использование тага в селекте аналитик может добавлять модели в загрузку не меняя даг.
Надёжность: Airflow перезапускает упавшие задачи и уведомляет о проблемах.
Масштабируемость: Подходит как для небольших загрузок, так и для сложных ETL-цепочек.
DBT мощный инструмент по генерации таблиц и трансформации данных в СУБД. Дополнения, сделанные в DBT Proplum, расширяют функциональность работы DBT для аналитических баз Greenplum и ClickHouse. Эти дополнения:
Упрощают миграцию с других СУБД за счёт поддержки внешних таблиц.
Добавляют дополнительные стратегии загрузок, заточенные под специфику баз Greenplum и ClickHouse.
Логируют все шаги выполнения DBT внутри СУБД.
Проект доступен в открытом исходном коде, и мы приглашаем сообщество к сотрудничеству!
Если вас заинтересовал наш инструмент, приглашаем посмотреть запись вебинара, посвященного демонстрации работы с фреймворком на живых данных.
Теги: #Greenplum #ClickHouse #DBT #ETL #DataEngineering #OpenSource
Хабы: Блог компании Sapiens solutions, Data Engineering, Хранилища данных, Open Source